#!/usr/bin/python3
# autopkgtest check: enable/disable/configure units
# (C) 2015 Canonical Ltd.
# Author: Martin Pitt <martin.pitt@ubuntu.com>

import unittest
import subprocess
import os
import sys
import tempfile
from glob import glob

system_unit_dir = subprocess.check_output(
    ['pkg-config', '--variable=systemdsystemunitdir', 'systemd'],
    universal_newlines=True).strip()
systemd_sysv_install = os.path.join(os.path.dirname(system_unit_dir),
                                    'systemd-sysv-install')


class EnableTests(unittest.TestCase):
    def tearDown(self):
        # remove all traces from our test unit
        f = glob(system_unit_dir + '/test_enable*.service')
        f += glob(system_unit_dir + '/*/test_enable*.service')
        f += glob('/etc/systemd/system/test_enable*.service')
        f += glob('/etc/systemd/system/*/test_enable*.service')
        f += glob('/etc/init.d/test_enable*')
        f += glob('/etc/rc?.d/???test_enable*')
        [os.unlink(i) for i in f]
        subprocess.check_call(['systemctl', 'daemon-reload'])

    def create_unit(self, suffix='', enable=False):
        '''Create a test unit'''

        unit = os.path.join(system_unit_dir,
                            'test_enable%s.service' % suffix)
        with open(unit, 'w') as f:
            f.write('''[Unit]
Description=Testsuite unit %s
[Service]
ExecStart=/bin/echo hello
[Install]
WantedBy=multi-user.target
''' % suffix)

        if enable:
            os.symlink(unit, '/etc/systemd/system/multi-user.target.wants/' +
                       os.path.basename(unit))

        return unit

    def create_sysv(self, suffix='', enable=False):
        '''Create a test SysV script'''

        script = '/etc/init.d/test_enable%s' % suffix
        with open(script, 'w') as f:
            f.write('''/bin/sh
### BEGIN INIT INFO
# Provides:          test_enable%s
# Required-Start:    $remote_fs $syslog
# Required-Stop:     $remote_fs $syslog
# Default-Start:     2 3 4 5
# Default-Stop:      0 1 6
# Short-Description: Testsuite script%s
### END INIT INFO

echo hello
''' % (suffix, suffix))
        os.chmod(script, 0o755)

        if enable:
            subprocess.check_call(
                [systemd_sysv_install, 'enable', os.path.basename(script)])

    def assertEnabled(self, enabled, unit='test_enable.service'):
        '''assert that given unit has expected state'''

        systemctl = subprocess.Popen(['systemctl', 'is-enabled', unit],
                                     stdout=subprocess.PIPE,
                                     universal_newlines=True)
        out = systemctl.communicate()[0].strip()
        if enabled:
            self.assertEqual(systemctl.returncode, 0)
            self.assertEqual(out, 'enabled')
        else:
            self.assertEqual(systemctl.returncode, 1)
            self.assertEqual(out, 'disabled')

    def test_unit_enable(self):
        '''no sysv: enable unit'''

        self.create_unit()
        self.assertEnabled(False)
        # also works without .service suffix
        self.assertEnabled(False, unit='test_enable')

        subprocess.check_call(['systemctl', 'enable', 'test_enable'])

        self.assertEnabled(True)
        # also works without .service suffix
        self.assertEnabled(True, unit='test_enable')

        l = '/etc/systemd/system/multi-user.target.wants/test_enable.service'
        self.assertTrue(os.path.islink(l))
        self.assertEqual(os.readlink(l),
                         system_unit_dir + '/test_enable.service')

        # enable should be idempotent
        subprocess.check_call(['systemctl', 'enable', 'test_enable.service'])
        self.assertEnabled(True)

    def test_unit_disable(self):
        '''no sysv: disable unit'''

        self.create_unit(enable=True)
        self.assertEnabled(True)
        # also works without .service suffix
        self.assertEnabled(True, unit='test_enable')

        subprocess.check_call(['systemctl', 'disable', 'test_enable'])

        self.assertEnabled(False)
        # also works without .service suffix
        self.assertEnabled(False, unit='test_enable')

        l = '/etc/systemd/system/multi-user.target.wants/test_enable.service'
        self.assertFalse(os.path.islink(l))

        # disable should be idempotent
        subprocess.check_call(['systemctl', 'disable', 'test_enable.service'])
        self.assertEnabled(False)

    def test_unit_sysv_enable(self):
        '''with sysv: enable unit'''

        self.create_unit()
        self.create_sysv()
        self.assertEnabled(False)
        # also works without .service suffix
        self.assertEnabled(False, unit='test_enable')

        subprocess.check_call(['systemctl', 'enable', 'test_enable'])

        self.assertEnabled(True)
        # also works without .service suffix
        self.assertEnabled(True, unit='test_enable')

        l = '/etc/systemd/system/multi-user.target.wants/test_enable.service'
        self.assertTrue(os.path.islink(l))
        self.assertEqual(os.readlink(l),
                         system_unit_dir + '/test_enable.service')

        # enabled the sysv script
        l = glob('/etc/rc2.d/S??test_enable')
        self.assertEqual(len(l), 1, 'expect one symlink in %s' % repr(l))
        self.assertEqual(os.readlink(l[0]), '../init.d/test_enable')

        # enable should be idempotent
        subprocess.check_call(['systemctl', 'enable', 'test_enable.service'])
        self.assertEnabled(True)

    def test_unit_sysv_disable(self):
        '''with sysv: disable unit'''

        self.create_unit(enable=True)
        self.create_sysv(enable=True)
        self.assertEnabled(True)
        # also works without .service suffix
        self.assertEnabled(True, unit='test_enable')

        subprocess.check_call(['systemctl', 'disable', 'test_enable'])

        self.assertEnabled(False)
        # also works without .service suffix
        self.assertEnabled(False, unit='test_enable')

        l = '/etc/systemd/system/multi-user.target.wants/test_enable.service'
        self.assertFalse(os.path.islink(l))

        # disabled the sysv script
        l = glob('/etc/rc2.d/S??test_enable')
        self.assertEqual(l, [])

        # disable should be idempotent
        subprocess.check_call(['systemctl', 'enable', 'test_enable.service'])
        self.assertEnabled(True)

    def test_unit_alias_enable(self):
        '''no sysv: enable unit with an alias'''

        u = self.create_unit()
        with open(u, 'a') as f:
            f.write('Alias=test_enablea.service\n')

        self.assertEnabled(False)

        subprocess.check_call(['systemctl', 'enable', 'test_enable'])

        self.assertEnabled(True)

        # enablement symlink
        l = '/etc/systemd/system/multi-user.target.wants/test_enable.service'
        self.assertTrue(os.path.islink(l))
        self.assertEqual(os.readlink(l),
                         system_unit_dir + '/test_enable.service')

        # alias symlink
        l = '/etc/systemd/system/test_enablea.service'
        self.assertTrue(os.path.islink(l))
        self.assertEqual(os.readlink(l),
                         system_unit_dir + '/test_enable.service')

    def test_unit_alias_disable(self):
        '''no sysv: disable unit with an alias'''

        u = self.create_unit()
        with open(u, 'a') as f:
            f.write('Alias=test_enablea.service\n')
        os.symlink(system_unit_dir + '/test_enable.service',
                   '/etc/systemd/system/test_enablea.service')

        subprocess.check_call(['systemctl', 'disable', 'test_enable'])

        self.assertEnabled(False)

        # enablement symlink
        l = '/etc/systemd/system/multi-user.target.wants/test_enable.service'
        self.assertFalse(os.path.islink(l))

        # alias symlink
        l = '/etc/systemd/system/test_enablea.service'
        self.assertFalse(os.path.islink(l))

    def test_unit_sysv_alias_enable(self):
        '''with sysv: enable unit with an alias'''

        u = self.create_unit()
        with open(u, 'a') as f:
            f.write('Alias=test_enablea.service\n')
        self.create_sysv()

        self.assertEnabled(False)

        subprocess.check_call(['systemctl', 'enable', 'test_enable'])

        # enablement symlink
        l = '/etc/systemd/system/multi-user.target.wants/test_enable.service'
        self.assertTrue(os.path.islink(l))
        self.assertEqual(os.readlink(l),
                         system_unit_dir + '/test_enable.service')

        # alias symlink
        l = '/etc/systemd/system/test_enablea.service'
        self.assertTrue(os.path.islink(l))
        self.assertEqual(os.readlink(l),
                         system_unit_dir + '/test_enable.service')

        # enabled the sysv script
        l = glob('/etc/rc2.d/S??test_enable')
        self.assertEqual(len(l), 1, 'expect one symlink in %s' % repr(l))
        self.assertEqual(os.readlink(l[0]), '../init.d/test_enable')

        self.assertEnabled(True)

    def test_unit_sysv_alias_disable(self):
        '''with sysv: disable unit with an alias'''

        u = self.create_unit(enable=True)
        with open(u, 'a') as f:
            f.write('Alias=test_enablea.service\n')
        os.symlink(system_unit_dir + '/test_enable.service',
                   '/etc/systemd/system/test_enablea.service')
        self.create_sysv(enable=True)

        subprocess.check_call(['systemctl', 'disable', 'test_enable'])

        # enablement symlink
        l = '/etc/systemd/system/multi-user.target.wants/test_enable.service'
        self.assertFalse(os.path.islink(l))

        # alias symlink
        l = '/etc/systemd/system/test_enablea.service'
        self.assertFalse(os.path.islink(l))

        # disabled the sysv script
        l = glob('/etc/rc2.d/S??test_enable')
        self.assertEqual(l, [])

        self.assertEnabled(False)

    def test_sysv_enable(self):
        '''only sysv: enable'''

        self.create_sysv()
        subprocess.check_call(['systemctl', 'enable', 'test_enable'])

        # enabled the sysv script
        l = glob('/etc/rc2.d/S??test_enable')
        self.assertEqual(len(l), 1, 'expect one symlink in %s' % repr(l))
        self.assertEqual(os.readlink(l[0]), '../init.d/test_enable')

        # enable should be idempotent
        subprocess.check_call(['systemctl', 'enable', 'test_enable'])
        self.assertEnabled(True)

    def test_sysv_disable(self):
        '''only sysv: disable'''

        self.create_sysv(enable=True)
        subprocess.check_call(['systemctl', 'disable', 'test_enable'])

        # disabled the sysv script
        l = glob('/etc/rc2.d/S??test_enable')
        self.assertEqual(l, [])

        # disable should be idempotent
        subprocess.check_call(['systemctl', 'disable', 'test_enable'])
        self.assertEnabled(False)

    def test_unit_link(self):
        '''systemctl link'''

        with tempfile.NamedTemporaryFile(suffix='.service') as f:
            f.write(b'[Unit]\n')
            f.flush()
            subprocess.check_call(['systemctl', 'link', f.name])

            unit = os.path.basename(f.name)
            l = os.path.join('/etc/systemd/system', unit)
            self.assertEqual(os.readlink(l), f.name)

            # disable it again
            subprocess.check_call(['systemctl', 'disable', unit])
            # this should also remove the unit symlink
            self.assertFalse(os.path.islink(l))

    def test_unit_enable_full_path(self):
        '''systemctl enable a unit in a non-default path'''

        with tempfile.NamedTemporaryFile(suffix='.service') as f:
            f.write(b'''[Unit]
Description=test
[Service]
ExecStart=/bin/true
[Install]
WantedBy=multi-user.target''')
            f.flush()
            unit = os.path.basename(f.name)

            # now enable it
            subprocess.check_call(['systemctl', 'enable', f.name])
            self.assertEnabled(True, unit=unit)
            l = os.path.join('/etc/systemd/system', unit)
            self.assertEqual(os.readlink(l), f.name)
            enable_l = '/etc/systemd/system/multi-user.target.wants/' + unit
            self.assertEqual(os.readlink(enable_l), f.name)

            # disable it again
            subprocess.check_call(['systemctl', 'disable', unit])
            # self.assertEnabled(False) does not work as now systemd does not
            # know about the unit at all any more
            self.assertFalse(os.path.islink(enable_l))
            # this should also remove the unit symlink
            self.assertFalse(os.path.islink(l))


if __name__ == '__main__':
    unittest.main(testRunner=unittest.TextTestRunner(stream=sys.stdout,
                                                     verbosity=2))
